package com.ld.shieldsb.canalclient.handler;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Predicate;

import org.apache.commons.lang.StringUtils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.model.Dml;
import com.ld.shieldsb.canalclient.model.SingleDml;
import com.ld.shieldsb.canalclient.model.SyncItem;
import com.ld.shieldsb.canalclient.model.SyncResult;
import com.ld.shieldsb.canalclient.recoder.Recorder;
import com.ld.shieldsb.canalclient.util.CanalUtil;
import com.ld.shieldsb.common.core.model.Result;
import com.ld.shieldsb.common.core.util.ResultUtil;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
public abstract class BaseSyncService {

    protected int threads = 3; // 默认线程数
    protected boolean skipDupException; // 跳过异常

    protected List<SyncItem>[] dmlsPartition;
    protected ExecutorService[] executorThreads; // 线程池

    public BaseSyncService() {
        this(null, true);
    }

    @SuppressWarnings("unchecked")
    public BaseSyncService(Integer threads, boolean skipDupException) {
        this.skipDupException = skipDupException;
        if (threads != null) {
            this.threads = threads;
        }
        this.dmlsPartition = new List[this.threads];
        this.executorThreads = new ExecutorService[this.threads];
        for (int i = 0; i < this.threads; i++) {
            dmlsPartition[i] = new ArrayList<>();
            executorThreads[i] = Executors.newSingleThreadExecutor();
        }
    }

    /**
     * 批量同步回调
     *
     * @param dmls
     *            批量 DML
     * @param function
     *            回调方法
     */
    public void sync(List<Dml> dmls, Predicate<Dml> function, Recorder recorder, String key) {
        try {
            boolean toExecute = false;
            for (Dml dml : dmls) {
                if (!toExecute) {
                    toExecute = function.test(dml);
                } else {
                    function.test(dml);
                }
            }
            if (toExecute) {
                List<Future<Boolean>> futures = new ArrayList<>();
                for (int i = 0; i < threads; i++) {
                    int j = i;
                    if (dmlsPartition[j].isEmpty()) {
                        // bypass
                        continue;
                    }
                    futures.add(executorThreads[i].submit(() -> {
                        try {
                            dmlsPartition[j].forEach(syncItem -> sync(j, syncItem.getConfig(), syncItem.getSingleDml(), recorder));
                            dmlsPartition[j].clear();
                            submitSuccess(j); // 提交成功的回调
                            return true;
                        } catch (Throwable e) {
                            dmlsPartition[j].clear();
                            submitError(j); // 提交失败的回调
                            throw new RuntimeException(e);
                        }
                    }));
                }

                futures.forEach(future -> {
                    try {
                        future.get();
                    } catch (ExecutionException | InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        } finally {
            syncComplete(); // 完成的回调
        }
    }

    public void sync(Map<String, Map<String, MappingConfig>> mappingConfig, List<Dml> dmls, Recorder recorder, String key) {
        sync(dmls, dml -> {
            if (dml.getIsDdl() != null && dml.getIsDdl() && StringUtils.isNotEmpty(dml.getSql())) {
                // DDL 结构变化
                return false;
            } else {
                // DML 数据变化
                String destination = StringUtils.trimToEmpty(dml.getDestination());
                // String groupId = StringUtils.trimToEmpty(dml.getGroupId()); // 待使用
                String database = dml.getDatabase();
                String table = dml.getTable();
                Map<String, MappingConfig> configMap = mappingConfig.get(destination + "_" + database + "-" + table);

                if (configMap == null || configMap.values().isEmpty()) {
                    return false;
                }

                for (MappingConfig config : configMap.values()) {
                    boolean caseInsensitive = config.getDbMapping().isCaseInsensitive();
                    if (config.isConcurrent()) { // 是否并行同步，即多线程
                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                        singleDmls.forEach(singleDml -> {
                            int hash = pkHash(config.getDbMapping(), singleDml.getData());
                            SyncItem syncItem = new SyncItem(config, singleDml);
                            dmlsPartition[hash].add(syncItem);
                        });
                    } else {
                        int hash = 0;
                        List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                        singleDmls.forEach(singleDml -> {
                            SyncItem syncItem = new SyncItem(config, singleDml);
                            dmlsPartition[hash].add(syncItem);
                        });
                    }
                }

                // 记录有效Dml信息,及表名库名符合条件的，但是不一定处理，因为后面还有字段限制
                if (recorder != null) {
                    try {
                        recorder.recordValidDml(dml, key); // 记录Dml，注意一定要获取，否则队列满了之后就阻塞了
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return true;
            }
        }, recorder, key);
    }

    /**
     * 取主键hash
     */
    public int pkHash(DbMapping dbMapping, Map<String, Object> d) {
        return pkHash(dbMapping, d, null);
    }

    public int pkHash(DbMapping dbMapping, Map<String, Object> d, Map<String, Object> o) {
        int hash = 0;
        // 取主键
        for (Map.Entry<String, String> entry : dbMapping.getTargetPk().entrySet()) {
            String targetColumnName = entry.getKey();
            String srcColumnName = entry.getValue();
            if (srcColumnName == null) {
                srcColumnName = CanalUtil.cleanColumn(targetColumnName);
            }
            Object value = null;
            if (o != null && o.containsKey(srcColumnName)) {
                value = o.get(srcColumnName);
            } else if (d != null) {
                value = d.get(srcColumnName);
            }
            if (value != null) {
                hash += value.hashCode();
            }
        }
        hash = Math.abs(hash) % threads; // 绝对值 取余，放到对应的线程中
        return Math.abs(hash);
    }

    /**
     * 单条 dml 同步
     *
     * @param batchExecutor
     *            批量事务执行器
     * @param config
     *            对应配置对象
     * @param dml
     *            DML
     */
    public SyncResult sync(int index, MappingConfig config, SingleDml dml, Recorder recorder) {
        SyncResult flag = new SyncResult();
        if (config != null) {
            try {
                String type = dml.getType();
                if (type != null && type.equalsIgnoreCase("INSERT")) {
                    flag = insert(index, config, dml); // 插入
                } else if (type != null && type.equalsIgnoreCase("UPDATE")) {
                    flag = update(index, config, dml); // 更新
                } else if (type != null && type.equalsIgnoreCase("DELETE")) {
                    flag = delete(index, config, dml); // 删除
                } else if (type != null && type.equalsIgnoreCase("TRUNCATE")) {
                    // 清空表
                    flag = truncate(index, config);
                }
                if (log.isDebugEnabled()) {
                    log.debug("DML: {}", JSON.toJSONString(dml, SerializerFeature.WriteMapNullValue));
                }
                // 记录器
                if (recorder != null) {
                    recorder.recordSync(dml, config, flag);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return flag;
    }

    /**
     * 同步单条Dml数据，用于同步失败时的手动同步数据
     * 
     * @Title syncSingle
     * @author 吕凯
     * @date 2021年12月25日 下午5:05:58
     * @param mappingConfig
     * @param key
     * @param dml
     * @param dataIndex
     * @param targetDb
     * @param targetTable
     * @return Result
     */
    public Result syncSingle(Map<String, Map<String, MappingConfig>> mappingConfig, String key, Dml dml, int dataIndex, String targetDb,
            String targetTable) {
        Result result = new Result();
        try {
            MappingConfig config = null;
            targetDb = (targetDb == null ? "" : targetDb);
            targetTable = (targetTable == null ? "" : targetTable);

            // DML 数据变化
            String destination = StringUtils.trimToEmpty(dml.getDestination());
            // String groupId = StringUtils.trimToEmpty(dml.getGroupId()); // 待使用
            String database = dml.getDatabase();
            String table = dml.getTable();
            Map<String, MappingConfig> configMap = mappingConfig.get(destination + "_" + database + "-" + table);

            if (configMap == null || configMap.values().isEmpty()) {
                return ResultUtil.error("配置参数为空！");
            }
            SingleDml singleDml = null;
            for (MappingConfig configM : configMap.values()) {
                boolean caseInsensitive = configM.getDbMapping().isCaseInsensitive();
                DbMapping dbMapping = configM.getDbMapping();
                String mappingTargetTable = (dbMapping.getTargetTable() == null ? "" : dbMapping.getTargetTable());
                String mappingTargetDb = (dbMapping.getTargetDb() == null ? "" : dbMapping.getTargetDb());
                if (mappingTargetDb.equalsIgnoreCase(targetDb) && mappingTargetTable.equalsIgnoreCase(targetTable)) {
                    config = configM;
                    List<SingleDml> singleDmls = SingleDml.dml2SingleDmls(dml, caseInsensitive);
                    singleDml = singleDmls.get(dataIndex);
                }
            }
            // 不为空
            if (singleDml != null) {
                SyncResult flag = sync(-1, config, singleDml, null);
                if (flag.isSuccess()) {
                    result.setSuccess(true);
                    result.setMessage("同步成功！");
                } else {
                    result.setMessage(flag.getMsg());
                }
            } else {
                result.setMessage("数据未找到！");
            }
        } catch (Exception e) {
            result.setSuccess(false);
            result.setMessage("同步失败！" + e.getMessage());
            log.error("[" + key + "]数据库同步单条数据失败！", e);
        }
        return result;

    }

    /**
     * 插入操作
     *
     * @param config
     *            配置项
     * @param dml
     *            DML数据
     */
    public abstract SyncResult insert(int index, MappingConfig config, SingleDml dml) throws Exception;

    /**
     * 更新
     * 
     * @Title update
     * @author 吕凯
     * @date 2021年12月6日 下午5:46:57
     * @param solrClient
     * @param config
     * @param dml
     * @throws Exception
     *             void
     */
    public abstract SyncResult update(int index, MappingConfig config, SingleDml dml) throws Exception;

    /**
     * 删除数据
     * 
     * @Title delete
     * @author 吕凯
     * @date 2021年12月6日 下午4:44:23
     * @param solrClient
     * @param config
     * @param dml
     * @throws SQLException
     *             void
     */
    public abstract SyncResult delete(int index, MappingConfig config, SingleDml dml) throws Exception;

    public abstract SyncResult truncate(int index, MappingConfig config) throws SQLException;

    public abstract void submitSuccess(int index) throws Exception; // 多线程提交成功

    public abstract void submitError(int index) throws Exception; // 多线程提交失败

    public abstract void syncComplete(); // 完成的回调

    /**
     * 关闭，释放资源
     * 
     * @Title destroy
     * @author 吕凯
     * @date 2021年12月6日 下午6:06:12 void
     */
    public abstract void close();

    /**
     * 关闭资源供子类调用
     * 
     * @Title closeResources
     * @author 吕凯
     * @date 2021年12月16日 上午9:18:12 void
     */
    public void closeResources() {
        for (int i = 0; i < threads; i++) {
            executorThreads[i].shutdown();
        }
    }
}
